Providing dynamic code to the cluster

Code to be executed on the cluster can be installed beforehand, or can be provided dynamically in two ways:

  1. Added as a library to the Spark context for a running pyspark session
  2. Defined inline, and passed as a closure during operations (map/reduce etc.)

1. Adding library to Spark context

A library of Python code can be imported into the client (i.e. here) and added to the cluster.

  • Importing the library makes its functionality available here, in the notebook
  • Adding the library to the spark context makes its functionality available to the slave nodes

You would use this for a Python library that isn't (yet) formally installed on the slave nodes.


In [1]:
# Import library
import pe
# Add this same library to the cluster via the Spark context
sc.addPyFile('./pe.py')

help(pe)


Help on module pe:

NAME
    pe - Generate permutation entropy maps for a dataset.

FILE
    /home/ubuntu/pe.py

FUNCTIONS
    pe_single(TS, m, t)
        Calculate permutation entropy for a single timeseries.
        
        TS: time series column vector
        m: order of permutation entropy (e.g. 5)
        t: delay of permutation entropy (e.g. 2)
    
    write_pe_map(pes, m, t, out_filename)
        Write a set of PEs out to a HDF5 file.
        
        The output data is placed in a chunk called "m<order>t<delay>", with meta-data values "order" and "delay".
        
        pes: Dictionary of (inj,fb) to PE floating point value
        m: order of permutation entropy
        t: delay of permutation entropy
        out_filename: path to HDF5 file that is appended with data.


2. Defining function to be executed on data

This is a function that can be applied to each item in a RDD.

We define it here in the notebook. It can be serialised and sent out to be executed on the cluster slave nodes.


In [2]:
import h5py

def pe_shard(m, t, raw_file):
    h5f = h5py.File(raw_file.local_path())  # Open from locally cached file
    results = dict()
    for ds in h5f:
        inj = h5f[ds].attrs['INJ']
        fb = h5f[ds].attrs['FB']
        TS = h5f[ds][()]
        pe_result = pe.pe_single(TS, m, t)  # pe.pe_single(): added to cluster
        results[(inj, fb)] = pe_result
    h5f.close()
    return results

Cluster data

We acquire a dataset from a repository.


In [3]:
from bdkd import datastore as ds

repo = ds.repositories().get('bdkd-laser-public')
dataset = repo.get('datasets/Sample dataset')
len(dataset.files)


Out[3]:
1094

In this case we want to work on a subset of the files in the dataset. We can pick them by name: it so happens that the files we want have a name containing "FB_" so we can filter on that.


In [4]:
raw_files = dataset.files_matching('FB_')
len(raw_files)


Out[4]:
1092

Cluster computing

We can parallelise the list of files, creating what Spark calls a "resilient distributed dataset" (RDD). This is basically a dataset that can be spread around the cluster and acted upon.

In this case we will breaking it up into a number of pieces equal to the number of files (i.e. numSlices). This will cause each file to be processed individually. Slicing is a strategic decision: how you slice up your data may affect the efficiency of processing.


In [5]:
raw_rdd = sc.parallelize(raw_files, numSlices=len(raw_files))

This is where computation on the cluster happens.

Notes:

  • We are acting on the RDD defined above (raw_rdd).
  • We are performing two actions: map and reduce. These are sent out to the cluster, with the data.
    • Note that lambda defines an anonymous function/closure
  • map: apply the pe_shard() function (defined above) to each item in the RDD.
  • reduce: bring the results back together by merging the dictionaries of results.

In [24]:
m = 6  # order of permutation entropy
t = 5  # delay

### CLUSTER ###
pes = raw_rdd\
    .map(lambda raw_file: pe_shard(m, t, raw_file))\
    .reduce(lambda x, y: dict(x, **y))
### CLUSTER ###

Check that the size of the results is equal to the number of timeseries.


In [19]:
len(pes)


Out[19]:
88101

The results we just generated are a mapping of (injection,feedback) --> entropy. We can turn this into a 2D array of injection x feedback.


In [25]:
(max_inj, max_fb) = sorted(pes.keys())[-1]
pe_array = []
for inj in range(max_inj + 1):
    row = []
    for fb in range (max_fb + 1):
        row.append(pes[(inj, fb)])
    pe_array.append(row)

Displaying results

We can plot the permutation entropy calculations as a heat map.

First, we need the feedback and injection range maps provided with the dataset. These are stored in a HDF5 file called "maps.hdf5", which we can acquire by name and open.


In [22]:
import h5py

maps = h5py.File(dataset.file_ending('maps.hdf5').local_path())
FBT = maps['FBT_map.csv'][()]
INJ = maps['INJ_map.csv'][()]

### Contents of file:
# maps.items()

We create a heatmap plot of injection versus feedback versus permutation entropy (calculated above), with a color bar on the side.

The iPython magic "%matplotlib inline" allows us to display plots within this page.


In [28]:
%matplotlib inline

import numpy as np

mapX = np.array(INJ)
mapY = np.array(FBT)
mapZ = np.array(pe_array)  # calculated above
fig = plt.figure()
plt.pcolor(mapX, mapY, mapZ)
plt.axes().set_xlim(np.min(mapX), np.max(mapX))
plt.axes().set_ylim(np.min(mapY), np.max(mapY))
plt.colorbar()


Out[28]:
<matplotlib.colorbar.Colorbar instance at 0x7f0793547fc8>

In [ ]: